package com.dm.cloud.service.impl;

import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.baomidou.mybatisplus.core.conditions.update.UpdateWrapper;
import com.dm.cloud.api.dto.Seckill;
import com.dm.cloud.api.dto.SeckillOrders;
import com.dm.cloud.api.service.ISeckillService;
import com.dm.cloud.common.R;
import com.dm.cloud.dao.SeckillOrdersDao;
import com.dm.cloud.dao.SeckillDao;
import com.dm.cloud.utils.CommonConst;
import com.dm.cloud.utils.mq.rabbitmq.RabbitMQExchanges;
import com.dm.cloud.utils.mq.rabbitmq.RabbitMQRoutingKeys;
import com.dm.cloud.utils.mq.rabbitmq.RabbitPublishUtils;
import org.apache.commons.collections4.CollectionUtils;
import org.redisson.api.RAtomicLong;
import org.redisson.api.RBucket;
import org.redisson.api.RLock;
import org.redisson.api.RedissonClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import java.util.*;
import java.util.concurrent.TimeUnit;

@Service
public class SeckillServiceImpl implements ISeckillService {

    @Autowired
    RedissonClient redission;
    @Autowired
    SeckillDao seckillDao;
    @Autowired
    SeckillOrdersDao ordersDao;

    /**
     * 下单接口
     * @param accountId
     * @param kid
     * @return
     */
    @Override
    public R seckill(Long accountId,Long kid) throws InterruptedException {
        //获取活动开始时间
        RBucket<Date> startTime = redission.getBucket(CommonConst.SECKILL_START_TIMESTEMP + kid);
        if(!startTime.isExists() || new Date().compareTo(startTime.get())<0) {//获取不到表示活动还未开始
            return R.failure("活动未开始!");
        }

        //获取活动结束时间
        RBucket<Date> stopTime = redission.getBucket(CommonConst.SECKILL_STOP_TIMESTEMP + kid);

        if(new Date().compareTo(stopTime.get())>0){//判断活动是否结束
            return R.failure("活动已结束!");
        }

        //获取用户个人锁，处理同一用户同时多次请求秒杀接口
        //采用自动释放锁的方式，500ms后自动释放，500ms内统一用户的请求视为非正常请求
        RLock lock = redission.getLock(CommonConst.SECKILL_LOCK_USER + kid + ":" + accountId);
        boolean locked = lock.tryLock(0,500,TimeUnit.MILLISECONDS);
        if(locked){

            //判断是否已经秒杀成功过
            RBucket<Set> succedUsers = redission.getBucket(CommonConst.SECKILL_SUCCEED_USERS + kid);
            if(succedUsers.isExists() && succedUsers.get().contains(accountId)){
                return R.failure("抢购次数已用尽!");
            }

            //判断有下单记录
            RBucket<Set> checkoutUsers = redission.getBucket(CommonConst.SECKILL_ORDER_USERS + kid);
            if(checkoutUsers.isExists() && checkoutUsers.get().contains(accountId)){
                return R.failure("已有下单记录，请前往支付!");
            }

            //判断是否还有库存(下单时做初步判断，防止没有库存了仍旧能下单。)
            RAtomicLong count = redission.getAtomicLong(CommonConst.SECKILL_REMAIN_COUNT + kid);
            if(!count.isExists() || count.get()<=0) {
                return R.failure("已售罄!");
            }

            //写入下单成功的人员列表
            RLock gwlock = redission.getLock(CommonConst.SECKILL_LOCK_GLOBAL + kid );
            if(gwlock.tryLock(1000, TimeUnit.MILLISECONDS)) {
                Set<Long> newUsers = new HashSet<>();
                if(checkoutUsers.isExists()){
                    newUsers = checkoutUsers.get();
                    newUsers.add(accountId);
                }else{
                    newUsers.add(accountId);
                }
                checkoutUsers.set(newUsers);
                gwlock.unlock();

                String secOrder = UUID.randomUUID().toString().replace("-","");//返回订单标志
                //生成下单所需基本信息，例如：账户、秒杀ID，
                SeckillOrders checkout = new SeckillOrders();
                checkout.setOrderNo(secOrder);
                checkout.setAccountId(accountId);
                checkout.setSeckillId(kid);
                checkout.setCount(1);
                checkout.setRabbitMqType(0);
                //放进队列中处理
                RabbitPublishUtils.sendDirectMessage(RabbitMQExchanges.EXCHANGE_A, RabbitMQRoutingKeys.ROUTINGKEY_A, checkout);

                // 最后返回下单单号供前端刷新界面使用
                return R.success(secOrder,"下单成功");
            }else{
                return R.failure("活动火爆，请重新尝试!");
            }
        }else{
            return R.failure("操作频繁!");
        }
    }

    /**
     * 测试接口
     */
    @Override
    public void payTest() {
        List<SeckillOrders> orders = ordersDao.selectList(new QueryWrapper<SeckillOrders>().eq("status",0));
        if(CollectionUtils.isNotEmpty(orders)){
            orders.stream().parallel().forEach(e-> {
                try {
                    killpay(e.getOrderNo());
                } catch (InterruptedException interruptedException) {
                    System.out.println(interruptedException.getCause());
                }
            });
        }
    }

    /**
     * 支付接口
     * @param seckillOrder
     * @return
     * @throws InterruptedException
     */
    @Override
    public R killpay(String seckillOrder) throws InterruptedException {
        //订单信息从数据库中查询 防篡改
        QueryWrapper<SeckillOrders> queryWrapper = new QueryWrapper<>();
        queryWrapper.eq("order_no",seckillOrder);
        SeckillOrders checkout = ordersDao.selectOne(queryWrapper);
        Long kid = checkout.getSeckillId();

        RBucket<Set> succedUsers = redission.getBucket(CommonConst.SECKILL_SUCCEED_USERS + kid);

        //库存先扣减1 写锁控制下不让其他进程读取库存值 也可以通过写lua脚本
        RLock glock = redission.getLock(CommonConst.SECKILL_LOCK_GLOBAL + kid);
        if (glock.tryLock(1000, TimeUnit.MILLISECONDS)) {

            Set<Long> newUsers = new HashSet<>();
            if(succedUsers.isExists()){
                newUsers = succedUsers.get();
                if(newUsers.contains(checkout.getAccountId())){
                    glock.unlock();
                    return R.failure("请勿重复付款!");
                }
            }

            RAtomicLong count = redission.getAtomicLong(CommonConst.SECKILL_REMAIN_COUNT + kid);
            if (count.isExists() && count.get() > 0) {
                count.getAndDecrement();
            }else{
                glock.unlock();
                return R.failure("已售罄!");
            }

            //添加到购买成功的人员列表中
            newUsers.add(checkout.getAccountId());
            succedUsers.set(newUsers);

            glock.unlock();
        }

        //扣减数据库中的库存交由消息队列中去处理
        checkout.setRabbitMqType(1);
        RabbitPublishUtils.sendDirectMessage(RabbitMQExchanges.EXCHANGE_A, RabbitMQRoutingKeys.ROUTINGKEY_A, checkout);

        try {
            //TODO 调用支付接口
//            int sd = 0;
//            Object sds = 10/sd;
        }catch (Exception ex){

            //支付失败 恢复商品数量 可以交由消息队列中去处理
            checkout.setRabbitMqType(2);
            RabbitPublishUtils.sendDirectMessage(RabbitMQExchanges.EXCHANGE_A, RabbitMQRoutingKeys.ROUTINGKEY_A, checkout);
            if (glock.tryLock(1000, TimeUnit.MILLISECONDS)) {
                RAtomicLong count = redission.getAtomicLong(CommonConst.SECKILL_REMAIN_COUNT + kid);
                count.getAndIncrement();

                Set<Long> newUsers = new HashSet<>();
                if(succedUsers.isExists()){
                    newUsers = succedUsers.get();
                    newUsers.remove(checkout.getAccountId());
                }
                succedUsers.set(newUsers);

                glock.unlock();
            }
            return R.failure("支付失败");
        }

        //下单状态修改 改为已付款
        checkout.setStatus(1);
        ordersDao.updateById(checkout);

        return R.success();
    }

}
